// DwsTradePaySucStats.java
package com.lhd.app.dws;

import com.alibaba.fastjson.JSONObject;
import com.lhd.bean.PaySucStats;
import com.lhd.common.utils.MyClickHouseUtil;
import com.lhd.common.utils.MyKafkaUtil;
import com.lhd.common.utils.MyKafkaUtil2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class DwsTradePaySucStats {

    public static void main(String[] args) throws Exception {
        
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 创建 Kafka 源表
        String sourceTopic = "dwd_pay_suc_stats2";
        String groupId = "dws_pay_suc_stats_group";
        DataStreamSource<String> stringDataStreamSource = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId));
// 将原始的字符串流转换为实体类流
        SingleOutputStreamOperator<PaySucStats> paySucStatsStream = stringDataStreamSource.map(jsonString -> {
            // 使用FastJSON将JSON字符串转换为JSONObject
            JSONObject jsonObject = JSONObject.parseObject(jsonString);

            // 创建PaySucStats实体类并设置属性
            PaySucStats paySucStats = new PaySucStats();
            paySucStats.setStartTime(jsonObject.getString("start_time"));
            paySucStats.setEndTime(jsonObject.getString("end_time"));
            paySucStats.setPaySucUc(jsonObject.getLong("pay_suc_uc"));
            paySucStats.setPaySucAmount(jsonObject.getDouble("pay_suc_amount"));

            return paySucStats;
        });

// 打印实体类流进行验证
        paySucStatsStream.print();
        paySucStatsStream.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_pay_suc_stats values(?,?,?,?)"));

        // 7. 执行任务
        env.execute("DwsTradePaySucStats");
    }
}